package com.mvc.core.utils.task;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 
 * 延时队列任务管理
 * 
 * @author 李涛
 * @version 创建时间：2018年6月16日 下午3:35:39
 */
@Component
public class TaskQueueBean {

	private static final Logger LOG = LoggerFactory.getLogger(TaskQueueBean.class);

	private static volatile boolean started = false;

	private TaskQueueBean() {
	}

	private static class LazyHolder {
		private static TaskQueueBean taskQueueDaemonThread = new TaskQueueBean();
	}

	public static TaskQueueBean getInstance() {
		return LazyHolder.taskQueueDaemonThread;
	}

	/**
	 * 执行任务的线程
	 */
	private ExecutorService executor = null;

	/**
	 * 创建一个最初为空的新 DelayQueue
	 */
	private DelayQueue<DelayedTask<Runnable>> queue = null;

	/**
	 * 守护线程
	 */
	private Thread daemonThread;

	/**
	 * 初始化守护线程
	 */
	@PostConstruct
	public synchronized void start() {
		// 1.初始化线程池
		if (!started) {
			started = true;
			executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
			queue = new DelayQueue<>();
			// 2.判断是否启动
			if (daemonThread != null && daemonThread.isInterrupted()) {
				daemonThread.start();
				return;
			}
		}
		daemonThread = new Thread() {
			public void run() {
				try {
					execute();
				} catch (InterruptedException e) {
					daemonThread.interrupt();
				}
			}
		};
		daemonThread.setDaemon(true);
		daemonThread.setName("DelayedTask");
		daemonThread.start();
		LOG.info("~~~~~~~~~~~~~~~~~~~~延时任务开启~~~~~~~~~~~~~~~~~~~~~~~~~");
	}

	private void execute() throws InterruptedException {
		LOG.info("[ task start {} ]:", System.currentTimeMillis());
		while (started) {
			// 从延迟队列中取值,如果没有对象过期则队列一直等待，
			DelayedTask<Runnable> t1 = queue.take();
			if (t1 != null) {
				// 修改问题的状态
				Runnable task = t1.getTask();
				if (task == null) {
					continue;
				}
				executor.execute(task);
				LOG.info("[ {}  task {} execute  ] ", t1.getN(), t1.getName());
			}
		}
	}

	/**
	 * 添加任务， time 延迟时间 task 任务 用户为问题设置延迟时间
	 */
	public void put(long time, Runnable task, String taskName) {
		if (!started) {
			throw new UnsupportedOperationException("请先启动taskQueneBean！");
		}
		// 转换成ns
		long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
		// 创建一个任务
		DelayedTask<Runnable> k = new DelayedTask<Runnable>(nanoTime, task, taskName);
		// 将任务放在延迟的队列中
		queue.put(k);
		LOG.info("新任务：{}加入队列，当前队列任务数量：{}", taskName, queue.size());
	}

	/**
	 * 结束
	 * 
	 * @param task
	 */
	public boolean endTask(DelayedTask<Runnable> task) {
		if (!started) {
			throw new UnsupportedOperationException("请先启动taskQueneBean！");
		}
		return queue.remove(task);
	}

	/**
	 * 手动关闭任务
	 */
	public synchronized void stop() {
		if (started) {
			LOG.info("shutdown TaskQueueBean");
			started = false;
			daemonThread.interrupt();
			executor.shutdown();
			daemonThread = null;
			queue = null;
		}
	}
}
